# Copyright (c) HySoP 2011-2024
#
# This file is part of HySoP software.
# See "https://particle_methods.gricad-pages.univ-grenoble-alpes.fr/hysop-doc/"
# for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from abc import ABCMeta, abstractmethod
import numpy as np
from hysop.tools.decorators import debug
from hysop.tools.htypes import to_tuple, first_not_None
from hysop.tools.sympy_utils import subscript
from hysop.core.mpi import MPI
[docs]
class TaggedObjectView(metaclass=ABCMeta):
"""View on a TaggedObject, just forwards tag and id for views."""
@debug
def __new__(cls, obj_view=None, **kwds):
obj = super().__new__(cls, **kwds)
obj.__obj_view = obj_view or obj
return obj
@debug
def __init__(self, obj_view=None, **kwds):
if obj_view is not None:
self.__obj_view = obj_view
assert isinstance(self.__obj_view, TaggedObject)
super().__init__(**kwds)
def __get_object_id(self):
"""Unique id of the underlying object view."""
if (not hasattr(self, "_TaggedObjectView__obj_view")) or (
self.__obj_view is None
):
assert isinstance(self, TaggedObject)
return getattr(self, "_TaggedObject__get_object_id")()
else:
return getattr(self.__obj_view, "_TaggedObject__get_object_id")()
def __get_object_tag(self):
"""Unique tag of the underlying object view."""
if (not hasattr(self, "_TaggedObjectView__obj_view")) or (
self.__obj_view is None
):
assert isinstance(self, TaggedObject)
return getattr(self, "_TaggedObject__get_object_tag")()
else:
return getattr(self.__obj_view, "_TaggedObject__get_object_tag")()
def __get_object_pretty_tag(self):
"""Unique pretty tag of the underlying object view."""
if (not hasattr(self, "_TaggedObjectView__obj_view")) or (
self.__obj_view is None
):
assert isinstance(self, TaggedObject)
return getattr(self, "_TaggedObject__get_object_pretty_tag")()
else:
return getattr(self.__obj_view, "_TaggedObject__get_object_pretty_tag")()
def __get_object_full_tag(self):
"""Unique tag of the underlying object view with cls information."""
return f"{self.__class__.__name__}::{self.__get_object_tag()}"
def __get_object_full_pretty_tag(self):
"""Unique tag of the underlying object view with cls information."""
return f"{self.__class__.__name__}::{self.__get_object_pretty_tag()}"
id = property(__get_object_id)
tag = property(__get_object_tag)
pretty_tag = property(__get_object_pretty_tag)
full_tag = property(__get_object_full_tag)
full_pretty_tag = property(__get_object_full_pretty_tag)
@abstractmethod
def __eq__(self, other):
pass
@abstractmethod
def __ne__(self, other):
pass
@abstractmethod
def __hash__(self):
pass
def __repr__(self):
return self.full_tag
[docs]
class TaggedObject(metaclass=ABCMeta):
"""
Generic class to count object instances and associate a tag to it.
A tag is basically the id of the object instance formatted to a string.
This make it simpler to visually identify objects compared to python
object id (for logging or debug purposes).
"""
# Counter of instances to set a unique id for each object.
__ids = {}
[docs]
@debug
def __new__(
cls,
tag_prefix=None,
tag_postfix=None,
tag_formatter=None,
tagged_cls=None,
**kwds,
):
"""
Create a TaggedObject object and assign it an id.
"""
assert (tag_prefix is None) or isinstance(tag_prefix, str)
assert (tag_postfix is None) or isinstance(tag_postfix, str)
assert (tag_formatter is None) or callable(tag_formatter)
tagged_cls = first_not_None(tagged_cls, cls)
try:
obj = super().__new__(cls, **kwds)
except TypeError:
msg = f"\nFATAL ERROR during {cls.__name__}.__new__(cls, **kwds)."
msg += "\nThis may be due to extra keyword arguments:\n *"
msg += "\n *".join(kwds.keys())
msg += "\nIf you believe that those arguments are valid, check the following types: \n *"
msg += "\n *".join(map(str, cls.__mro__[:-1]))
msg += "\n"
print(msg)
raise
if tagged_cls in TaggedObject.__ids:
obj.__tag_id = TaggedObject.__ids[tagged_cls]
TaggedObject.__ids[tagged_cls] += 1
else:
obj.__tag_id = 0
TaggedObject.__ids[tagged_cls] = 1
obj.__tag_prefix = tag_prefix
obj.__tag_postfix = tag_postfix
obj.__tag_formatter = tag_formatter
return obj
@debug
def __init__(
self,
tag_prefix=None,
tag_postfix=None,
tag_formatter=None,
tagged_cls=None,
**kwds,
):
"""
Initialize a TaggedObject with a tag prefix/postfix/formatter, all optional.
"""
assert (tag_prefix is None) or isinstance(tag_prefix, str)
assert (tag_postfix is None) or isinstance(tag_postfix, str)
assert (tag_formatter is None) or callable(tag_formatter)
try:
super().__init__(**kwds)
except TypeError:
cls = self.__class__
msg = f"\nFATAL ERROR during {cls.__name__}.__init__(**kwds)."
msg += "\nThis may be due to extra keyword arguments:\n *"
msg += "\n *".join(kwds.keys())
msg += "\nIf you believe that those arguments are valid, check the following types: \n *"
msg += "\n *".join(map(str, cls.__mro__[:-1]))
print(msg)
raise
# reaffect attributes (some classes use only __init__ for simplicity)
self.__tag_prefix = first_not_None(self.__tag_prefix, tag_prefix, "")
self.__tag_postfix = first_not_None(self.__tag_postfix, tag_postfix, "")
self.__tag_formatter = first_not_None(self.__tag_formatter, tag_formatter)
def __get_object_id(self):
"""
Return the id of the present object tag id.
This is not the same as the python object id.
"""
return self.__tag_id
def __get_object_tag(self):
"""
Get the formatted tag of this object as a string.
This is an instance identifier
"""
tag_formatter = first_not_None(self.__tag_formatter, lambda x: x)
tag_prefix = first_not_None(self.__tag_prefix, "")
tag_postfix = first_not_None(self.__tag_postfix, "")
return "{}{}{}".format(tag_prefix, tag_formatter(self.__tag_id), tag_postfix)
def __get_object_pretty_tag(self):
"""
Get the formatted pretty tag of this object as a string.
This is an instance identifier
"""
tag_formatter = first_not_None(self.__tag_formatter, lambda x: x)
tag_prefix = first_not_None(self.__tag_prefix, "")
tag_postfix = first_not_None(self.__tag_postfix, "")
return "{}{}{}".format(
tag_prefix, tag_formatter(subscript(self.__tag_id)), tag_postfix
)
def __get_object_full_tag(self):
"""
Get the formatted tag of this object as a string.
This is an instance identifier along with a
class identifier.
"""
return f"{self.__class__.__name__}::{self.__get_object_tag()}"
def __get_object_full_pretty_tag(self):
"""
Get the formatted tag of this object as a string.
This is an instance identifier along with a
class identifier.
"""
return f"{self.__class__.__name__}::{self.__get_object_pretty_tag()}"
id = property(__get_object_id)
tag = property(__get_object_tag)
pretty_tag = property(__get_object_pretty_tag)
full_tag = property(__get_object_full_tag)
full_pretty_tag = property(__get_object_full_pretty_tag)
def __repr__(self):
return self.full_tag
@abstractmethod
def __eq__(self, other):
pass
@abstractmethod
def __ne__(self, other):
pass
@abstractmethod
def __hash__(self):
pass
[docs]
class RegisteredObject(TaggedObject):
"""
Generic class to manage unique immutable object instances (like Topologies,
Domains and so on). Has a TaggedObject interface but each tag id is unique among
all objects of the same class created with the exact same hashed __new__ arguments.
"""
# Counter of instances to set a unique id for each unique object.
__registered_objects = {}
@classmethod
def __reset(cls):
cls.__registered_objects.clear()
@classmethod
def _format_arg(cls, key, arg):
"""
Check if all arguments are hashable (and immutable).
If object is not hashable, an error is raised.
"""
if arg is None:
msg = "Keyword argument {} is None and it should be replaced by its default value, "
msg += "in child classes __new__ method."
msg = msg.format(key)
raise ValueError(msg)
elif isinstance(arg, list):
msg = "Keyword argument {} is a list, should be a tuple."
msg = msg.format(key)
raise TypeError(msg)
elif isinstance(arg, set):
msg = "Keyword argument {} is a set, should be a frozenset."
msg = msg.format(key)
raise TypeError(msg)
elif isinstance(arg, dict):
msg = "Keyword argument {} is a dict but it should be a tuple of tuples."
msg = msg.format(key)
raise TypeError(msg)
elif isinstance(arg, np.ndarray):
if arg.flags.writeable:
msg = "Keyword argument {} is a np.ndarray but it has not been set to readonly."
msg = msg.format(key)
raise RuntimeError(msg)
return arg.tobytes()
elif isinstance(arg, (MPI.Intracomm, MPI.Intercomm)):
return id(arg)
else:
try:
hash(arg)
except TypeError:
msg = "\nFATAL ERROR: Cannot hash argument {} with type {} "
msg += "which is required for RegisteredObject derived types keyword arguments.\n"
msg = msg.format(key, type(arg))
print(msg)
raise
return arg
[docs]
@debug
def __new__(
cls,
register_object=True,
tag_prefix=None,
tag_postfix=None,
tag_formatter=None,
**kwds,
):
r"""
Creates and return a RegisteredObject and assign a unique id if
and only if a previous object was not already created with the
exact same keywords arguments (modulo hashing).
If the object was already created, it's instance is simply returned.
All keyword arguments contained in **kwds have to be immutable:
*Arguments cannot be None (you have to set all default arguments
in child classes __new__).
*Lists should be replaced by tuples.
*Sets should be replaced by frozensets.
*Dictionaries should be internally stored as tuple of tuples (use dict.items()).
*np.ndarray should be set read-only (by using npw.set_readonly).
*MPI.Intracomm and MPI.Intercomm are hashed with their python object id,
child classes are responsible for the immutability of those parameters.
Initialization should be done if and only if self._initialized is False.
/!\ Always pass current class __new__ arguments to the base class __new__.
Full modification input keywords arguments should be done in __new__,
before base class __new__ call.
All input keyword arguments should be formatted to the exact same immutable
type and default values should be precomputed, otherwise two different
RegisteredObject might be created even if the final objects are the same
after initialization.
Full initialization of object should be done in __new__ after base class __new__
guarded by initialization safe guard (if obj.obj_initialized).
[modify original **kwds to immutable objects]
obj = super(CurrentClass,cls).__new__(cls, [current __new__ **kwds], **kwds)
if not obj.obj_initialized:
[initialize returned object (which is a new instance)]
return obj
/!\ Only define __init__ if absolutely required (for multiple inheritance of custom
objects for example and if you know what you are doing).
__init__ default arguments will not be the ones from __new__ if modified.
self.__init__(**kwds) is always called, even if the object was already
created, a safeguard should be set in each child class __init__ method,
if present. User arguments are passed as *user supplied* from __new__
to __init__. This means that all default arguments modified in __new__
are *not* passed modified to __init__ and thus that all class parameters
should be set in __new__. When using __init__, its arguments should not be
used.
"""
if register_object:
key = (cls,) + tuple(
cls._format_arg(k, kwds[k]) for k in sorted(kwds.keys())
)
registered_objects = cls.__registered_objects
if key in registered_objects:
obj = registered_objects[key]
assert type(obj) is cls, "FATAL ERROR: Type mismatch."
assert obj.obj_initialized
else:
obj = super().__new__(
cls,
tag_prefix=tag_prefix,
tag_postfix=tag_postfix,
tag_formatter=tag_formatter,
)
obj.__initialized = False
registered_objects[key] = obj
else:
obj = super().__new__(
cls,
tag_prefix=tag_prefix,
tag_postfix=tag_postfix,
tag_formatter=tag_formatter,
)
obj.__initialized = False
return obj
@debug
def __init__(
self,
register_object=True,
tag_prefix=None,
tag_postfix=None,
tag_formatter=None,
**kwds,
):
"""
Initialize this object.
If self._initialized was already set to True, raise a RuntimeError.
Else, set self._initialized to True.
"""
if self.__initialized:
return
super().__init__(
tag_prefix=tag_prefix, tag_postfix=tag_postfix, tag_formatter=tag_formatter
)
self.__initialized = True
def __del__(self):
key = None
for k, v in self.__registered_objects.items():
if v is self:
key = k
if key is not None:
del self.__registered_objects[key]
def __get_obj_initialized(self):
"""
Return the object initialization state.
"""
return self.__initialized
# default python comparisson and hash because we have unique instances
[docs]
def __eq__(self, other):
"""Check if two instances are the same."""
return id(self) == id(other)
[docs]
def __ne__(self, other):
"""Check if two instances are not the same."""
return id(self) != id(other)
[docs]
def __hash__(self):
"""Return the python object id of this instance."""
return id(self)
obj_initialized = property(__get_obj_initialized)
[docs]
class Handle(metaclass=ABCMeta):
"""
Generic class to encapsulate various objects ('handles').
"""
[docs]
@abstractmethod
def handle_cls(cls):
"""
Returns the wrapped python type.
"""
pass
[docs]
@abstractmethod
def wrap(cls, handle, **kargs):
"""
Build a wrapped instance of handle.
"""
pass
@debug
def __init__(self, handle, **kargs):
"""
Build a Handle from a handle instance.
Raise ValueError if given handle is None.
Raise TypeError if given handle does not match targetted
handle type.
"""
super().__init__(**kargs)
cls = self.__class__
handle_cls = cls.handle_cls()
if handle is None:
msg = "Handle cannot be None."
raise ValueError(msg)
if not isinstance(handle, handle_cls):
msg = "Handle is not a {} but a {}."
msg = msg.format(handle_cls, handle.__class__)
raise TypeError(msg)
self._cls = cls
self._handle = handle
[docs]
def handle(self):
"""
Return encapsulated handle instance.
"""
return self._handle